package com.sencorsta.ids.core.tcp.socket.client;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.sencorsta.ids.core.function.FunctionSystem;
import com.sencorsta.ids.core.log.Out;
import com.sencorsta.ids.core.tcp.socket.protocol.BaseMessage;

import io.netty.channel.Channel;

/**
 * 客户端连接执行者
 * 
 * @author ICe
 */
public abstract class RpcClientWorker implements Runnable, RpcClientCallback {

	private static final int __WARN_COUNT__ = 10000;

	protected final BlockingQueue<BaseMessage> __QUEUE__ = new LinkedBlockingQueue<>();

	protected String serverHost;
	protected int serverPort;

	protected RpcClientBootstrap bootstrap;

	protected Channel channel;
	protected volatile boolean disconnect = true;

	protected String name;

	public String getName() {
		return name;
	}

	public RpcClientWorker() {
		this.name = this.getClass().getSimpleName();
	}

	/**
	 * 添加消息到队里中， 并检查队列大小
	 */
	public void addSend(BaseMessage message) {
		if (channel != null) {
			__QUEUE__.add(message);
			if (size() > __WARN_COUNT__) {
				// 添加报警方法
				Out.warn(name + "发送队列太长: " + __QUEUE__.size());
			}
		}
	}

	public int size() {
		return __QUEUE__.size();
	}

	public void start() throws Exception {
		// GGame.getInstance().onWorkerBefore(this);
		Channel channel=bootstrap.connect(serverHost, serverPort);
		if (channel==null){
			throw new Exception("连接失败");
		}else {
			bind(channel);
			new Thread(this, name).start();
		}
	}
	
	public void run() {
//		GSystem.waitSeconds(5);
		while (true) {
			try {
				if (disconnect) {
					String connName = Thread.currentThread().getName();
					Out.info("开始重连" + connName);
					while ((channel = bootstrap.connect(serverHost, serverPort)) == null) {
						Out.warn("无法重连" + connName);
						FunctionSystem.waitSeconds(5);
					}
					Out.info("成功重连" + connName);
					bind(channel);
				}

				BaseMessage msg = __QUEUE__.poll(60, TimeUnit.SECONDS);
				if (msg == null) {
					// 发送ping
					ping();
				} else {
					send(msg);
				}
			} catch (Exception e) {
				Out.error(e);
				e.printStackTrace();
			}
		}
	}

	protected void send(BaseMessage message) {
		if (channel != null) {
			channel.writeAndFlush(message);
			Out.trace("RpcClientWorker开始send:" + message);
		} else {
			Out.trace("RpcClientWorker开始send,channel为空:", "取消发送");
		}
	}

	public abstract void doStart();

	public void bind(Channel channel) {
		if (channel != null) {
			this.channel = channel;
			__QUEUE__.clear();
			doStart();
			disconnect = false;
			onWorkerReady();
		}
	}

	protected void onWorkerReady() {
		Out.trace(name, " -> ", "绑定成功 ", channel);
	}

	public void close(Channel channel) {
		if (this.channel == channel) {
			disconnect = true;
			Out.trace(name + " close!!!");
		}
	}
	
	
	
}
